#include <stdlib.h>
#include <unistd.h>

#include "msgqueue.h"
#include "../../storage/chunk/chunk_ops.h"
#include "../../storage/storage/locator_rpc.h"
#include "license.h"
#include "utils.h"
#include "dbg.h"

#define TEST_MSGQUEUE_SIZE ((LLU)1024*1024*1024*10) //10G 能放30T chunk
#define TEST_CHUNK_COUNT (1024*1024*30) //

#define CHUNK_CLEANUP_MSG_SIZE (sizeof(cleanup_msg_t) + sizeof(msgqueue_msg_t))

typedef struct {
        fileid_t parent;
        chkid_t chkid;
        uint64_t meta_version;
        char pool[MAX_NAME_LEN];
} cleanup_msg_t;

/**
 * 测试写入性能
 *
 * @Param queue
 * @Param count 写入的个数
 *
 * @Returns   
 */
int test_push(msgqueue_t *queue, int count)
{
        int ret, i;
        cleanup_msg_t msg;

        for (i = 0; i < count; i++) {
                msg.chkid.id = i;
                snprintf(msg.pool, MAX_NAME_LEN, "pool%d", i);
                ret = msgqueue_push(queue, &msg, sizeof(msg));
                if (ret) {
                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

int test_pop(msgqueue_t *queue)
{
        int ret, i, len, count;
        char buf[MAX_BUF_LEN];
        msgqueue_msg_t *msg;
        const cleanup_msg_t *_msg;

        while (1) {
                len = MAX_BUF_LEN;
                len = (len / CHUNK_CLEANUP_MSG_SIZE) * CHUNK_CLEANUP_MSG_SIZE;
                /*printf("pop --- %d \n", len);*/
                ret = msgqueue_pop(queue, buf, len);
                if (ret < 0) {
                        YASSERT(0);
                        ret = -ret;
                        GOTO(err_ret, ret);
                }
                /*printf("pop ---ok %d \n", ret);*/

                if (ret == 0) {
                        break;
                } else {
                        len = ret;
                        count = len / CHUNK_CLEANUP_MSG_SIZE;
                        for (i = 0; i < count; i++) {
                                msg = (void *)buf + i * CHUNK_CLEANUP_MSG_SIZE;
                                _msg = (void *)msg->buf;

                                YASSERT(msg->len == sizeof(*_msg));
                                YASSERT(msg->crc == crc32_sum(msg->buf, msg->len));
                        }
                }
        }

        return 0;
err_ret:
        return ret;
}

typedef struct {
        msgqueue_t *queue;
} test_pop_arg_t;

static void *__test_pop_thread(void *_arg)
{
        test_pop_arg_t *arg = _arg;

        while (1) {
                printf("pop... \n");
                test_pop(arg->queue);
                printf("pop ok, wait retry\n");
                sleep(30);
        }
        return NULL;
}

int test_pop_start(msgqueue_t *queue)
{
        int ret;
        pthread_t th;
        pthread_attr_t ta;
        test_pop_arg_t *arg;

        ret = ymalloc((void**)&arg, sizeof(test_pop_arg_t));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        (void) pthread_attr_init(&ta);
        (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);
        arg->queue = queue;

        ret = pthread_create(&th, &ta, __test_pop_thread, (void *)arg);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

typedef struct {
        msgqueue_t *queue;
        int count;
} test_push_arg_t;

static void *__test_push_thread(void *_arg)
{
        test_push_arg_t *arg = _arg;

        while (1) {
                printf("push...\n");
                test_push(arg->queue, arg->count);
                printf("push ok, wait retry\n");
                sleep(3);
        }

        return NULL;
}

int test_push_start(msgqueue_t *queue, int count)
{
        int ret;
        pthread_t th;
        pthread_attr_t ta;
        test_push_arg_t *arg;

        ret = ymalloc((void**)&arg, sizeof(test_push_arg_t));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        (void) pthread_attr_init(&ta);
        (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);
        arg->queue = queue;
        arg->count = count;

        ret = pthread_create(&th, &ta, __test_push_thread, (void *)arg);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int main(int argc, char *argv[])
{
        int ret, args = 1;
        const char *home;
        msgqueue_t queue;

        if (argc < 2) {
                fprintf(stderr, "msgqueue_test /msg_home/\n");
                EXIT(1);
        }

        home = argv[args++];

        ret = conf_init();
        if (ret) {
                GOTO(err_ret, ret);
        }

        ret = msgqueue_load(&queue, home,
                        CHUNK_CLEANUP_MSG_SIZE, TEST_MSGQUEUE_SIZE, 1);
        if (ret) {
                GOTO(err_ret, ret);
        }

#if 1
        printf("start test write, time %d\n", (int)gettime());
        ret = test_push_start(&queue, TEST_CHUNK_COUNT);
        if (ret) {
                GOTO(err_ret, ret);
        }
        printf("test write ok, time %d\n", (int)gettime());
#endif

#if 1
        printf("test pop start, time %d\n", (int)gettime());
        ret = test_pop_start(&queue);
        if (ret) {
                GOTO(err_ret, ret);
        }
        printf("test pop ok, time %d\n", (int)gettime());
#endif

        sleep(3600*16);

        return 0;
err_ret:
        YASSERT(0);
        return ret;
}
